package org.hawkular.metrics.scheduler.impl;

import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Statement;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.hawkular.metrics.datetime.DateTimeService;
import org.hawkular.metrics.scheduler.api.JobDetails;
import org.hawkular.metrics.scheduler.api.JobStatus;
import org.hawkular.metrics.scheduler.api.RetryPolicy;
import org.hawkular.metrics.scheduler.api.Scheduler;
import org.hawkular.metrics.scheduler.api.SingleExecutionTrigger;
import org.hawkular.metrics.scheduler.api.Trigger;
import org.hawkular.rx.cassandra.driver.RxSession;
import org.jboss.logging.Logger;
import org.joda.time.DateTime;
import org.joda.time.Minutes;
import rx.Completable;
import rx.Observable;
import rx.Single;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;

/* loaded from: input_file:WEB-INF/lib/hawkular-metrics-job-scheduler-0.21.1.Final.jar:org/hawkular/metrics/scheduler/impl/SchedulerImpl.class */
public class SchedulerImpl implements Scheduler {
    private RxSession session;
    private PreparedStatement deleteScheduleJob;
    private LockManager lockManager;
    private JobsService jobsService;
    private boolean running;
    static final String QUEUE_LOCK_PREFIX = "org.hawkular.metrics.scheduler.queue.";
    static final String SCHEDULING_LOCK = "scheduling";
    static final String TIME_SLICE_EXECUTION_LOCK = "executing";
    static final String JOB_EXECUTION_LOCK = "locked";
    static final int SCHEDULING_LOCK_TIMEOUT_IN_SEC = 5;
    static final int JOB_EXECUTION_LOCK_TIMEOUT_IN_SEC = 3600;
    private static Logger logger = Logger.getLogger(SchedulerImpl.class);
    private static Func2<JobDetails, Throwable, RetryPolicy> NO_RETRY = (jobDetails, th) -> {
        return RetryPolicy.NONE;
    };
    private AtomicInteger ticks = new AtomicInteger();
    private final Object lock = new Object();
    private Map<String, Func1<JobDetails, Completable>> jobFactories = new HashMap();
    private Map<String, Func2<JobDetails, Throwable, RetryPolicy>> retryFunctions = new HashMap();
    private ScheduledExecutorService tickExecutor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("ticker-pool-%d").build());
    private rx.Scheduler tickScheduler = Schedulers.from(this.tickExecutor);
    private ExecutorService queryExecutor = new ThreadPoolExecutor(getQueryThreadPoolSize(), getQueryThreadPoolSize(), 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new ThreadFactoryBuilder().setNameFormat("query-thread-pool-%d").build(), new ThreadPoolExecutor.DiscardPolicy());
    private rx.Scheduler queryScheduler = Schedulers.from(this.queryExecutor);
    private PreparedStatement insertJob = initQuery("INSERT INTO jobs (id, type, name, params, trigger) VALUES (?, ?, ?, ?, ?)");
    private PreparedStatement insertScheduledJob = initQuery("INSERT INTO scheduled_jobs_idx (time_slice, job_id) VALUES (?, ?)");
    private PreparedStatement findScheduledJobs = initQuery("SELECT job_id FROM scheduled_jobs_idx WHERE time_slice = ?");
    private PreparedStatement deleteScheduledJobs = initQuery("DELETE FROM scheduled_jobs_idx WHERE time_slice = ?");
    private PreparedStatement deleteScheduledJob = initQuery("DELETE FROM scheduled_jobs_idx WHERE time_slice = ? AND job_id = ?");
    private PreparedStatement findFinishedJobs = initQuery("SELECT job_id FROM finished_jobs_idx WHERE time_slice = ?");
    private PreparedStatement deleteFinishedJobs = initQuery("DELETE FROM finished_jobs_idx WHERE time_slice = ?");
    private PreparedStatement updateJobToFinished = initQuery("INSERT INTO finished_jobs_idx (time_slice, job_id) VALUES (?, ?)");
    private PreparedStatement findJob = initQuery("SELECT type, name, params, trigger FROM jobs WHERE id = ?");
    private PreparedStatement findAllJobs = initQuery("SELECT id, type, name, params, trigger FROM jobs");
    private PreparedStatement addActiveTimeSlice = initQuery("INSERT INTO active_time_slices (time_slice) VALUES (?)");
    private PreparedStatement findActiveTimeSlices = initQuery("SELECT DISTINCT time_slice FROM active_time_slices");
    private PreparedStatement deleteActiveTimeSlice = initQuery("DELETE FROM active_time_slices WHERE time_slice = ?");
    private Optional<PublishSubject<Date>> finishedTimeSlices = Optional.empty();
    private Optional<PublishSubject<JobDetails>> jobFinished = Optional.empty();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/hawkular-metrics-job-scheduler-0.21.1.Final.jar:org/hawkular/metrics/scheduler/impl/SchedulerImpl$JobExecutionState.class */
    public static class JobExecutionState {
        final JobDetails currentDetails;
        final JobDetails nextDetails;
        final Set<UUID> activeJobs;
        final Date timeSlice;
        final Date nextTimeSlice;

        public JobExecutionState(JobDetails jobDetails, Set<UUID> set) {
            this.currentDetails = jobDetails;
            this.activeJobs = set;
            this.timeSlice = new Date(jobDetails.getTrigger().getTriggerTime());
            this.nextDetails = null;
            this.nextTimeSlice = null;
        }

        public JobExecutionState(JobDetails jobDetails, Date date, JobDetails jobDetails2, Date date2, Set<UUID> set) {
            this.currentDetails = jobDetails;
            this.timeSlice = date;
            this.nextDetails = jobDetails2;
            this.nextTimeSlice = date2;
            this.activeJobs = set;
        }

        public boolean isRepeating() {
            return (this.nextDetails == null || this.nextTimeSlice == null) ? false : true;
        }

        public boolean isBehindSchedule() {
            return isRepeating() && this.nextTimeSlice.getTime() > this.nextDetails.getTrigger().getTriggerTime();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/hawkular-metrics-job-scheduler-0.21.1.Final.jar:org/hawkular/metrics/scheduler/impl/SchedulerImpl$JobLock.class */
    public static class JobLock {
        final JobDetails jobDetails;
        final boolean acquired;
        final String name;

        public JobLock(JobDetails jobDetails, boolean z) {
            this.jobDetails = jobDetails;
            this.acquired = z;
            this.name = "org.hawkular.metrics.scheduler.job." + jobDetails.getJobId();
        }
    }

    /* loaded from: input_file:WEB-INF/lib/hawkular-metrics-job-scheduler-0.21.1.Final.jar:org/hawkular/metrics/scheduler/impl/SchedulerImpl$QueryExecution.class */
    private static class QueryExecution {
        private Statement query;
        private ResultSet resultSet;
        private Throwable error;

        public QueryExecution(Statement statement, ResultSet resultSet) {
            this.resultSet = resultSet;
        }

        public QueryExecution(Statement statement, Throwable th) {
            this.error = th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/hawkular-metrics-job-scheduler-0.21.1.Final.jar:org/hawkular/metrics/scheduler/impl/SchedulerImpl$TimeSliceLock.class */
    public static class TimeSliceLock {
        private Date timeSlice;
        private String name;
        private boolean acquired;

        public TimeSliceLock(Date date, String str, boolean z) {
            this.timeSlice = date;
            this.name = str;
            this.acquired = z;
        }

        public Date getTimeSlice() {
            return this.timeSlice;
        }

        public String getName() {
            return this.name;
        }

        public boolean isAcquired() {
            return this.acquired;
        }
    }

    public SchedulerImpl(RxSession rxSession) {
        this.session = rxSession;
        this.lockManager = new LockManager(rxSession);
        this.jobsService = new JobsService(rxSession);
    }

    private PreparedStatement initQuery(String str) {
        return this.session.getSession().prepare(str).setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
    }

    private int getQueryThreadPoolSize() {
        return Math.max(Runtime.getRuntime().availableProcessors() / 2, 1);
    }

    public void setTickScheduler(rx.Scheduler scheduler) {
        this.tickScheduler = scheduler;
    }

    public void setTimeSlicesSubject(PublishSubject<Date> publishSubject) {
        this.finishedTimeSlices = Optional.of(publishSubject);
    }

    public void setJobFinishedSubject(PublishSubject<JobDetails> publishSubject) {
        this.jobFinished = Optional.of(publishSubject);
    }

    @Override // org.hawkular.metrics.scheduler.api.Scheduler
    public void register(String str, Func1<JobDetails, Completable> func1) {
        this.jobFactories.put(str, func1);
    }

    @Override // org.hawkular.metrics.scheduler.api.Scheduler
    public void register(String str, Func1<JobDetails, Completable> func1, Func2<JobDetails, Throwable, RetryPolicy> func2) {
        this.jobFactories.put(str, func1);
        this.retryFunctions.put(str, func2);
    }

    @Override // org.hawkular.metrics.scheduler.api.Scheduler
    public Single<JobDetails> scheduleJob(String str, String str2, Map<String, String> map, Trigger trigger) {
        if (DateTimeService.now.get().getMillis() >= trigger.getTriggerTime()) {
            return Single.error(new RuntimeException("Trigger time has already passed"));
        }
        String str3 = QUEUE_LOCK_PREFIX + trigger.getTriggerTime();
        return this.lockManager.acquireSharedLock(str3, SCHEDULING_LOCK, 5).map(bool -> {
            if (bool.booleanValue()) {
                return new JobDetails(UUID.randomUUID(), str, str2, map, trigger);
            }
            throw new RuntimeException("Failed to acquire scheduling lock [" + str3 + "]");
        }).flatMap(jobDetails -> {
            return this.jobsService.insert(new Date(trigger.getTriggerTime()), jobDetails).map(resultSet -> {
                return jobDetails;
            });
        }).toSingle();
    }

    @Override // org.hawkular.metrics.scheduler.api.Scheduler
    public void start() {
        this.running = true;
        ConcurrentSkipListSet concurrentSkipListSet = new ConcurrentSkipListSet();
        ConcurrentSkipListSet concurrentSkipListSet2 = new ConcurrentSkipListSet();
        doOnTick(() -> {
            logger.debug("Activating scheduler for [" + DateTimeService.currentMinute().toDate() + "]");
            Date date = DateTimeService.currentMinute().toDate();
            updateActiveTimeSlices(date).flatMap(r3 -> {
                return findTimeSlices();
            }).filter(date2 -> {
                synchronized (this.lock) {
                    if (concurrentSkipListSet.contains(date2)) {
                        return false;
                    }
                    concurrentSkipListSet.add(date2);
                    return true;
                }
            }).doOnNext(date3 -> {
                logger.debug("Running job scheduler for [" + date3 + "]");
            }).flatMap(this::acquireTimeSliceLock).flatMap(timeSliceLock -> {
                return findScheduledJobs(timeSliceLock.getTimeSlice()).doOnError(th -> {
                    logger.warn("Failed to find scheduled jobs for time slice " + timeSliceLock.timeSlice);
                }).doOnNext(set -> {
                    logger.debug("[" + timeSliceLock.timeSlice + "] scheduled jobs: " + set);
                }).flatMap(set2 -> {
                    return computeRemainingJobs(set2, timeSliceLock.getTimeSlice(), concurrentSkipListSet2);
                }).doOnNext(set3 -> {
                    logger.debug("[" + date + "] remaining jobs: " + set3);
                }).flatMap((v0) -> {
                    return Observable.from(v0);
                }).filter(jobDetails -> {
                    return Boolean.valueOf(!concurrentSkipListSet2.contains(jobDetails.getJobId()));
                }).flatMap(this::acquireJobLock).filter(jobLock -> {
                    return Boolean.valueOf(jobLock.acquired);
                }).map(jobLock2 -> {
                    return jobLock2.jobDetails;
                }).doOnNext(jobDetails2 -> {
                    logger.debug("Acquired job lock for " + jobDetails2 + " in time slice " + timeSliceLock.timeSlice);
                }).flatMap(jobDetails3 -> {
                    return executeJob(jobDetails3, timeSliceLock.timeSlice, concurrentSkipListSet2).toObservable().map(obj -> {
                        return timeSliceLock.getTimeSlice();
                    });
                }).defaultIfEmpty(timeSliceLock.getTimeSlice());
            }).flatMap(date4 -> {
                return Observable.sequenceEqual(this.jobsService.findScheduledJobs(date4, this.queryScheduler).map((v0) -> {
                    return v0.getJobId();
                }).collect(HashSet::new, (v0, v1) -> {
                    v0.add(v1);
                }), findFinishedJobs(date4)).flatMap(bool -> {
                    if (!bool.booleanValue()) {
                        return Observable.just(date4);
                    }
                    logger.debug("All jobs for time slice [" + date4 + "] have finished");
                    return Completable.merge(deleteActiveTimeSlice(date4), deleteFinishedJobs(date4), deleteScheduledJobs(date4)).toObservable().reduce(null, (obj, obj2) -> {
                        return obj2;
                    }).map(obj3 -> {
                        return date4;
                    });
                });
            }).subscribe(date5 -> {
                logger.debug("Finished post job execution clean up for [" + date5 + "]");
                concurrentSkipListSet.remove(date5);
                this.finishedTimeSlices.ifPresent(publishSubject -> {
                    publishSubject.onNext(date5);
                });
            }, th -> {
                logger.warn("Job execution failed", th);
            }, () -> {
                logger.debug("Done!");
            });
        });
    }

    private Observable<TimeSliceLock> acquireTimeSliceLock(Date date) {
        String str = QUEUE_LOCK_PREFIX + date.getTime();
        int i = 5;
        return Observable.create(subscriber -> {
            Observable<R> map = this.lockManager.acquireSharedLock(str, TIME_SLICE_EXECUTION_LOCK, 3600).map(bool -> {
                if (bool.booleanValue()) {
                    return new TimeSliceLock(date, str, bool.booleanValue());
                }
                logger.debug("Failed to acquire time slice lock for [" + date + "]. Will attempt to acquire it again in " + i + " seconds.");
                throw new RuntimeException();
            });
            subscriber.getClass();
            Action1 action1 = (v1) -> {
                r1.onNext(v1);
            };
            subscriber.getClass();
            Action1<Throwable> action12 = subscriber::onError;
            subscriber.getClass();
            map.subscribe(action1, action12, subscriber::onCompleted);
        }).retryWhen(observable -> {
            return observable.flatMap(th -> {
                return Observable.timer(i, TimeUnit.SECONDS, this.queryScheduler);
            });
        });
    }

    private Observable<JobLock> acquireJobLock(JobDetails jobDetails) {
        return this.lockManager.acquireExclusiveLock("org.hawkular.metrics.scheduler.job." + jobDetails.getJobId(), JOB_EXECUTION_LOCK, 3600).map(bool -> {
            return new JobLock(jobDetails, bool.booleanValue());
        });
    }

    private Completable executeJob(JobDetails jobDetails, Date date, Set<UUID> set) {
        logger.debug("Starting execution for " + jobDetails + " in time slice [" + date + "]");
        Stopwatch createStarted = Stopwatch.createStarted();
        Func1<JobDetails, Completable> func1 = this.jobFactories.get(jobDetails.getJobType());
        return (jobDetails.getStatus() == JobStatus.FINISHED ? Completable.complete() : func1.call(jobDetails)).onErrorResumeNext(th -> {
            logger.info("Execution of " + jobDetails + " in time slice [" + date + "] failed", th);
            final RetryPolicy call = this.retryFunctions.getOrDefault(jobDetails.getJobType(), NO_RETRY).call(jobDetails, th);
            if (call == RetryPolicy.NONE) {
                return Completable.complete();
            }
            if (jobDetails.getTrigger().nextTrigger() != null) {
                logger.warn("Retry policies cannot be used with jobs that repeat. " + jobDetails + " will execute again according to its next trigger.");
                return Completable.complete();
            }
            if (call == RetryPolicy.NOW) {
                return (Completable) func1.call(jobDetails);
            }
            return reschedule(new JobExecutionState(new JobDetails(jobDetails.getJobId(), jobDetails.getJobType(), jobDetails.getJobName(), jobDetails.getParameters(), new Trigger() { // from class: org.hawkular.metrics.scheduler.impl.SchedulerImpl.1
                @Override // org.hawkular.metrics.scheduler.api.Trigger
                public long getTriggerTime() {
                    return jobDetails.getTrigger().getTriggerTime();
                }

                @Override // org.hawkular.metrics.scheduler.api.Trigger
                public Trigger nextTrigger() {
                    return new SingleExecutionTrigger.Builder().withDelay(call.getDelay(), TimeUnit.MILLISECONDS).build();
                }
            }), set)).toCompletable();
        }).doOnCompleted(() -> {
            createStarted.stop();
            if (logger.isDebugEnabled()) {
                logger.debug("Finished executing " + jobDetails + " in time slice [" + date + "] " + createStarted.elapsed(TimeUnit.MILLISECONDS) + " ms");
            }
        }).toSingle(() -> {
            return new JobExecutionState(jobDetails, set);
        }).flatMap(jobExecutionState -> {
            return this.jobsService.updateStatusToFinished(date, jobExecutionState.currentDetails.getJobId()).toSingle().map(resultSet -> {
                return jobExecutionState;
            });
        }).flatMap(this::reschedule).flatMap(jobExecutionState2 -> {
            return jobExecutionState2.isBehindSchedule() ? setJobFinished(jobExecutionState2).flatMap(this::scheduleImmediateExecutionIfNecessary) : releaseJobExecutionLock(jobExecutionState2).flatMap(this::deactivate).flatMap(this::setJobFinished);
        }).doOnError(th2 -> {
            logger.debug("There was an error during post-job execution. Making sure " + jobDetails + " is removed from active jobs cache");
            set.remove(jobDetails.getJobId());
            publishJobFinished(jobDetails);
        }).doOnSuccess(jobExecutionState3 -> {
            publishJobFinished(jobExecutionState3.currentDetails);
        }).toCompletable().subscribeOn(Schedulers.io());
    }

    private void publishJobFinished(JobDetails jobDetails) {
        this.jobFinished.ifPresent(publishSubject -> {
            publishSubject.onNext(jobDetails);
        });
    }

    private Single<JobExecutionState> setJobFinished(JobExecutionState jobExecutionState) {
        return this.session.execute(this.updateJobToFinished.bind(jobExecutionState.timeSlice, jobExecutionState.currentDetails.getJobId()), this.queryScheduler).toSingle().map(resultSet -> {
            return jobExecutionState;
        }).doOnError(th -> {
            logger.warn("There was an error while updating the finished jobs index for " + jobExecutionState.currentDetails, th);
        });
    }

    private Single<JobExecutionState> reschedule(JobExecutionState jobExecutionState) {
        Trigger nextTrigger = jobExecutionState.currentDetails.getTrigger().nextTrigger();
        if (nextTrigger == null) {
            logger.debug("No more scheduled executions for " + jobExecutionState.currentDetails);
            return Single.just(jobExecutionState);
        }
        JobDetails jobDetails = jobExecutionState.currentDetails;
        JobDetails jobDetails2 = new JobDetails(jobDetails.getJobId(), jobDetails.getJobType(), jobDetails.getJobName(), jobDetails.getParameters(), nextTrigger);
        if (nextTrigger.getTriggerTime() <= DateTimeService.now.get().getMillis()) {
            logger.info(jobDetails + " missed its next execution at " + nextTrigger.getTriggerTime() + ". It will be rescheduled for immediate execution");
            AtomicLong atomicLong = new AtomicLong(DateTimeService.currentMinute().getMillis());
            return Observable.defer(() -> {
                return this.lockManager.acquireSharedLock(QUEUE_LOCK_PREFIX + atomicLong.addAndGet(60000L), SCHEDULING_LOCK, 5);
            }).map(bool -> {
                if (bool.booleanValue()) {
                    return bool;
                }
                throw new RuntimeException();
            }).retry().map(bool2 -> {
                return new JobExecutionState(jobExecutionState.currentDetails, jobExecutionState.timeSlice, jobDetails2, new Date(atomicLong.get()), jobExecutionState.activeJobs);
            }).flatMap(jobExecutionState2 -> {
                return this.jobsService.insert(jobExecutionState2.nextTimeSlice, jobExecutionState2.nextDetails).map(resultSet -> {
                    return jobExecutionState2;
                });
            }).toSingle();
        }
        logger.debug("Scheduling " + jobDetails2 + " for next execution at " + new Date(nextTrigger.getTriggerTime()));
        JobExecutionState jobExecutionState3 = new JobExecutionState(jobDetails, jobExecutionState.timeSlice, jobDetails2, new Date(nextTrigger.getTriggerTime()), jobExecutionState.activeJobs);
        return this.jobsService.insert(jobExecutionState3.nextTimeSlice, jobExecutionState3.nextDetails).map(resultSet -> {
            return jobExecutionState3;
        }).toSingle();
    }

    private Single<JobExecutionState> scheduleImmediateExecutionIfNecessary(JobExecutionState jobExecutionState) {
        Schedulers.io().createWorker().schedule(() -> {
            logger.debug("Starting immediate execution of " + jobExecutionState.nextDetails);
            this.lockManager.renewLock("org.hawkular.metrics.scheduler.job." + jobExecutionState.nextDetails.getJobId(), JOB_EXECUTION_LOCK, 3600).map(bool -> {
                if (bool.booleanValue()) {
                    return bool;
                }
                throw new RuntimeException("Failed to renew job lock for " + jobExecutionState.nextDetails);
            }).toCompletable().concatWith(executeJob(jobExecutionState.nextDetails, jobExecutionState.nextTimeSlice, jobExecutionState.activeJobs)).subscribe(() -> {
                logger.debug("Finished executing " + jobExecutionState.nextDetails);
            }, th -> {
                logger.warn("There was an error executing " + jobExecutionState.nextDetails);
            });
        });
        return Single.just(jobExecutionState);
    }

    private Single<JobExecutionState> deactivate(JobExecutionState jobExecutionState) {
        logger.debug("Removing " + jobExecutionState.currentDetails + " from active jobs " + jobExecutionState.activeJobs);
        jobExecutionState.activeJobs.remove(jobExecutionState.currentDetails.getJobId());
        return Single.just(jobExecutionState);
    }

    private Single<JobExecutionState> releaseJobExecutionLock(JobExecutionState jobExecutionState) {
        String str = "org.hawkular.metrics.scheduler.job." + jobExecutionState.currentDetails.getJobId();
        return this.lockManager.releaseLock(str, JOB_EXECUTION_LOCK).map(bool -> {
            if (bool.booleanValue()) {
                return jobExecutionState;
            }
            logger.warn("Failed to release job lock for " + jobExecutionState.currentDetails);
            throw new RuntimeException("Failed to release job lock for " + jobExecutionState.currentDetails);
        }).toSingle().doOnError(th -> {
            logger.warn("There was an error trying to release job lock [" + str + "] for " + jobExecutionState.currentDetails, th);
        });
    }

    private Completable deleteScheduledJobs(Date date) {
        return this.session.execute(this.deleteScheduledJobs.bind(date), this.queryScheduler).doOnCompleted(() -> {
            logger.debug("Deleted scheduled jobs time slice [" + date + "]");
        }).toCompletable();
    }

    private Completable deleteFinishedJobs(Date date) {
        return this.session.execute(this.deleteFinishedJobs.bind(date), this.queryScheduler).doOnCompleted(() -> {
            logger.debug("Deleted finished jobs time slice [" + date + "]");
        }).toCompletable();
    }

    private Completable deleteActiveTimeSlice(Date date) {
        return this.session.execute(this.deleteActiveTimeSlice.bind(date), this.queryScheduler).doOnCompleted(() -> {
            logger.debug("Deleted active time slice [" + date + "]");
        }).toCompletable();
    }

    @Override // org.hawkular.metrics.scheduler.api.Scheduler
    public void shutdown() {
        try {
            this.running = false;
            this.tickExecutor.shutdown();
            this.tickExecutor.awaitTermination(5L, TimeUnit.SECONDS);
            this.queryExecutor.shutdown();
            this.queryExecutor.awaitTermination(30L, TimeUnit.SECONDS);
            logger.info("Shutdown complete");
        } catch (InterruptedException e) {
            logger.warn("Interrupted during shutdown", e);
        }
    }

    void reset(rx.Scheduler scheduler) {
        logger.debug("Starting reset");
        shutdown();
        this.jobFactories = new HashMap();
        this.tickScheduler = scheduler;
        this.queryExecutor = Executors.newFixedThreadPool(getQueryThreadPoolSize(), new ThreadFactoryBuilder().setNameFormat("query-thread-pool-%d").build());
        this.queryScheduler = Schedulers.from(this.queryExecutor);
    }

    private Observable<? extends Set<JobDetails>> findScheduledJobs(Date date) {
        logger.debug("Fetching scheduled jobs for [" + date + "]");
        return this.jobsService.findScheduledJobs(date, this.queryScheduler).collect(HashSet::new, (v0, v1) -> {
            v0.add(v1);
        });
    }

    private Observable<Set<JobDetails>> computeRemainingJobs(Set<JobDetails> set, Date date, Set<UUID> set2) {
        return findFinishedJobs(date).map(set3 -> {
            HashSet hashSet = new HashSet(set2);
            hashSet.removeAll(set3);
            return (Set) ((Set) set.stream().filter(jobDetails -> {
                return !set3.contains(jobDetails.getJobId());
            }).collect(Collectors.toSet())).stream().filter(jobDetails2 -> {
                return !hashSet.contains(jobDetails2.getJobId());
            }).collect(Collectors.toSet());
        });
    }

    private Observable<? extends Set<UUID>> findFinishedJobs(Date date) {
        return this.session.execute(this.findFinishedJobs.bind(date), this.queryScheduler).flatMap((v0) -> {
            return Observable.from(v0);
        }).map(row -> {
            return row.getUUID(0);
        }).collect(HashSet::new, (v0, v1) -> {
            v0.add(v1);
        });
    }

    private void doOnTick(Action0 action0) {
        Action0 action02 = () -> {
            Date date = DateTimeService.getTimeSlice(new DateTime(this.tickScheduler.now()), Minutes.minutes(1).toStandardDuration()).toDate();
            logger.debug("[TICK][" + date + "] executing action");
            action0.call();
            logger.debug("Finished tick for [" + date + "]");
        };
        AtomicReference atomicReference = new AtomicReference();
        Observable.interval(0L, 1L, TimeUnit.MINUTES, this.tickScheduler).doOnNext(l -> {
            logger.debug("CURRENT MINUTE = " + DateTimeService.currentMinute().toDate());
        }).filter(l2 -> {
            DateTime currentMinute = DateTimeService.currentMinute();
            if (atomicReference.get() == null) {
                atomicReference.set(currentMinute);
                return true;
            }
            logger.debug("previous=[" + ((DateTime) atomicReference.get()).toLocalDateTime() + "], current=[" + currentMinute.toLocalDateTime() + "]");
            if (((DateTime) atomicReference.get()).equals(currentMinute)) {
                return false;
            }
            atomicReference.set(currentMinute);
            return true;
        }).takeUntil(l3 -> {
            return Boolean.valueOf(!this.running);
        }).subscribe(l4 -> {
            action02.call();
        }, th -> {
            logger.warn(th);
        });
    }

    private Observable<Void> updateActiveTimeSlices(Date date) {
        return this.session.execute(this.addActiveTimeSlice.bind(date)).map(resultSet -> {
            return null;
        });
    }

    private Observable<Date> findTimeSlices() {
        return this.session.execute(this.findActiveTimeSlices.bind(), this.queryScheduler).flatMap((v0) -> {
            return Observable.from(v0);
        }).map(row -> {
            return row.getTimestamp(0);
        }).toSortedList().flatMap((v0) -> {
            return Observable.from(v0);
        });
    }
}
